/*
 *
 * Copyright 2022 gRPC authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

package xds_test

import (
	"context"
	"fmt"
	"testing"

	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
	"google.golang.org/grpc/internal"
	"google.golang.org/grpc/internal/stubserver"
	"google.golang.org/grpc/internal/testutils"
	"google.golang.org/grpc/internal/testutils/rls"
	"google.golang.org/grpc/internal/testutils/xds/e2e"
	"google.golang.org/protobuf/types/known/durationpb"

	v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
	v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
	v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
	v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
	rlspb "google.golang.org/grpc/internal/proto/grpc_lookup_v1"
	testgrpc "google.golang.org/grpc/interop/grpc_testing"
	testpb "google.golang.org/grpc/interop/grpc_testing"

	_ "google.golang.org/grpc/balancer/rls" // Register the RLS Load Balancing policy.
)

// defaultClientResourcesWithRLSCSP returns a set of resources (LDS, RDS, CDS, EDS) for a
// client to connect to a server with a RLS Load Balancer as a child of Cluster Manager.
func defaultClientResourcesWithRLSCSP(t *testing.T, lb e2e.LoadBalancingPolicy, params e2e.ResourceParams, rlsProto *rlspb.RouteLookupConfig) e2e.UpdateOptions {
	routeConfigName := "route-" + params.DialTarget
	clusterName := "cluster-" + params.DialTarget
	endpointsName := "endpoints-" + params.DialTarget
	return e2e.UpdateOptions{
		NodeID:    params.NodeID,
		Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(params.DialTarget, routeConfigName)},
		Routes: []*v3routepb.RouteConfiguration{e2e.RouteConfigResourceWithOptions(e2e.RouteConfigOptions{
			RouteConfigName:            routeConfigName,
			ListenerName:               params.DialTarget,
			ClusterSpecifierType:       e2e.RouteConfigClusterSpecifierTypeClusterSpecifierPlugin,
			ClusterSpecifierPluginName: "rls-csp",
			ClusterSpecifierPluginConfig: testutils.MarshalAny(t, &rlspb.RouteLookupClusterSpecifier{
				RouteLookupConfig: rlsProto,
			}),
		})},
		Clusters: []*v3clusterpb.Cluster{e2e.ClusterResourceWithOptions(e2e.ClusterOptions{
			ClusterName:   clusterName,
			ServiceName:   endpointsName,
			Policy:        lb,
			SecurityLevel: params.SecLevel,
		})},
		Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(endpointsName, params.Host, []uint32{params.Port})},
	}
}

// TestRLSinxDS tests an xDS configured system with an RLS Balancer present.
//
// This test sets up the RLS Balancer using the RLS Cluster Specifier Plugin,
// spins up a test service and has a fake RLS Server correctly respond with a
// target corresponding to this test service. This test asserts an RPC proceeds
// as normal with the RLS Balancer as part of system.
func (s) TestRLSinxDS(t *testing.T) {
	tests := []struct {
		name     string
		lbPolicy e2e.LoadBalancingPolicy
	}{
		{
			name:     "roundrobin",
			lbPolicy: e2e.LoadBalancingPolicyRoundRobin,
		},
		{
			name:     "ringhash",
			lbPolicy: e2e.LoadBalancingPolicyRingHash,
		},
	}
	for _, test := range tests {
		t.Run(test.name, func(t *testing.T) {
			testRLSinxDS(t, test.lbPolicy)
		})
	}
}

func testRLSinxDS(t *testing.T, lbPolicy e2e.LoadBalancingPolicy) {
	internal.RegisterRLSClusterSpecifierPluginForTesting()
	defer internal.UnregisterRLSClusterSpecifierPluginForTesting()

	// Set up all components and configuration necessary - management server,
	// xDS resolver, fake RLS Server, and xDS configuration which specifies an
	// RLS Balancer that communicates to this set up fake RLS Server.
	managementServer, nodeID, _, resolver, cleanup1 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
	defer cleanup1()

	server := stubserver.StartTestService(t, nil)
	defer server.Stop()

	lis := testutils.NewListenerWrapper(t, nil)
	rlsServer, rlsRequestCh := rls.SetupFakeRLSServer(t, lis)
	rlsProto := &rlspb.RouteLookupConfig{
		GrpcKeybuilders:      []*rlspb.GrpcKeyBuilder{{Names: []*rlspb.GrpcKeyBuilder_Name{{Service: "grpc.testing.TestService"}}}},
		LookupService:        rlsServer.Address,
		LookupServiceTimeout: durationpb.New(defaultTestTimeout),
		CacheSizeBytes:       1024,
	}

	const serviceName = "my-service-client-side-xds"
	resources := defaultClientResourcesWithRLSCSP(t, lbPolicy, e2e.ResourceParams{
		DialTarget: serviceName,
		NodeID:     nodeID,
		Host:       "localhost",
		Port:       testutils.ParsePort(t, server.Address),
		SecLevel:   e2e.SecurityLevelNone,
	}, rlsProto)

	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
	defer cancel()
	if err := managementServer.Update(ctx, resources); err != nil {
		t.Fatal(err)
	}

	// Configure the fake RLS Server to set the RLS Balancers child CDS
	// Cluster's name as the target for the RPC to use.
	rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rls.RouteLookupResponse {
		return &rls.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{"cluster-" + serviceName}}}
	})

	// Create a ClientConn and make a successful RPC.
	cc, err := grpc.NewClient(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(resolver))
	if err != nil {
		t.Fatalf("failed to dial local test server: %v", err)
	}
	defer cc.Close()

	client := testgrpc.NewTestServiceClient(cc)
	// Successfully sending the RPC will require the RLS Load Balancer to
	// communicate with the fake RLS Server for information about the target.
	if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
		t.Fatalf("rpc EmptyCall() failed: %v", err)
	}

	// These RLS Verifications makes sure the RLS Load Balancer is actually part
	// of the xDS Configured system that correctly sends out RPC.

	// Verify connection is established to RLS Server.
	if _, err = lis.NewConnCh.Receive(ctx); err != nil {
		t.Fatal("Timeout when waiting for RLS LB policy to create control channel")
	}

	// Verify an rls request is sent out to fake RLS Server.
	select {
	case <-ctx.Done():
		t.Fatalf("Timeout when waiting for an RLS request to be sent out")
	case <-rlsRequestCh:
	}
}
